---
title: Multiple Events and Multiple Algorithms
---

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

This is more advanced example, we recommend you go through the [DASE](dase.html) explanation first.

The [default algorithm described in DASE](dase.html#algorithm) uses user-to-item view events as training data. However, your application may have more than one type of events which you want to take into account, such as buy, rate and like events. One way to incorporate other types of events to improve the system is to add another algorithm to process these events, build a separated model and then combine the outputs of multiple algorithms during Serving.

In this example, we will add another algorithm to process like/dislike events. The final PredictedResults will be the combined outputs of both algorithms.

NOTE: This is just one of the ways to handle multiple types of events. We use this use case to demonstrate how one can build an engine with multiple algorithms. You may also build one single algorithm which takes different events into account without using multiple algorithms.

This example will demonstrate the following:

- Read multiple types of events
- Use positive and negative implicit events such as like and dislike with MLlib ALS algorithm
- Integrate multiple algorithms into one engine

The complete source code of this example can be found in [here](https://github.com/apache/predictionio/tree/develop/examples/scala-parallel-similarproduct/multi-events-multi-algos).

### Step 1. Read "like" and "dislike" events as TrainingData

Modify the following in DataSource.scala:

- In addition to the original `ViewEvent` class, add a new class `LikeEvent` which has a boolean `like` field to represent it's like or dislike event.
- Add a new field `likeEvents` into `TrainingData` class to store the `RDD[LikeEvent]`.
- Modify DataSource's `readTraining()` function to read "like" and "dislike" events from the Event Store.

The modification is shown below:

```scala

class DataSource(val dsp: DataSourceParams)
  extends PDataSource[TrainingData,
      EmptyEvaluationInfo, Query, EmptyActualResult] {

  override
  def readTraining(sc: SparkContext): TrainingData = {

    ...

    // get all "user" "view" "item" events
    val viewEventsRDD: RDD[ViewEvent] = ...

    // ADDED
    // get all "user" "like" and "dislike" "item" events
    val likeEventsRDD: RDD[LikeEvent] = PEventStore.find(
      appName = dsp.appName,
      entityType = Some("user"),
      eventNames = Some(List("like", "dislike")),
      // targetEntityType is optional field of an event.
      targetEntityType = Some(Some("item")))(sc)
      // eventsDb.find() returns RDD[Event]
      .map { event =>
        val likeEvent = try {
          event.event match {
            case "like" | "dislike" => LikeEvent(
              user = event.entityId,
              item = event.targetEntityId.get,
              t = event.eventTime.getMillis,
              like = (event.event == "like"))
            case _ => throw new Exception(s"Unexpected event ${event} is read.")
          }
        } catch {
          case e: Exception => {
            logger.error(s"Cannot convert ${event} to LikeEvent." +
              s" Exception: ${e}.")
            throw e
          }
        }
        likeEvent
      }.cache()

    new TrainingData(
      users = usersRDD,
      items = itemsRDD,
      viewEvents = viewEventsRDD,
      likeEvents = likeEventsRDD // ADDED
    )
  }
}

...

case class LikeEvent( // ADDED
  user: String,
  item: String,
  t: Long,
  like: Boolean // true: like. false: dislike
)

class TrainingData(
  val users: RDD[(String, User)],
  val items: RDD[(String, Item)],
  val viewEvents: RDD[ViewEvent],
  val likeEvents: RDD[LikeEvent] // ADDED
) extends Serializable {
  override def toString = {
    s"users: [${users.count()} (${users.take(2).toList}...)]" +
    s"items: [${items.count()} (${items.take(2).toList}...)]" +
    s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)" +
    // ADDED
    s"likeEvents: [${likeEvents.count()}] (${likeEvents.take(2).toList}...)"
  }
}
```

### Step 2. Modify Preparator and PreparedData

In Preparator.scala, simply pass the newly added `likeEvents` from `TrainingData` to `PreparedData`, as shown the code below:

```scala

class Preparator
  extends PPreparator[TrainingData, PreparedData] {

  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
    new PreparedData(
      users = trainingData.users,
      items = trainingData.items,
      viewEvents = trainingData.viewEvents,
      likeEvents = trainingData.likeEvents) // ADDED
  }
}

class PreparedData(
  val users: RDD[(String, User)],
  val items: RDD[(String, Item)],
  val viewEvents: RDD[ViewEvent],
  val likeEvents: RDD[LikeEvent] // ADDED
) extends Serializable

```

### Step 3. Add a new algorithm to train model with `likeEvents`

For demonstration purpose, we also use MLlib ALS to train model with `likeEvents` and hence the new algorithm class will share many common logic of the original algorithm. The only difference will be the `train()` function - the original one trains model with `viewEvents` while the new one uses `likeEvents`. In this case, we can simply create a new algorithm which extends the original `ALSAlgorithm` class and override the `train()` function.

NOTE: You may also create a completely new algorithm to process `likeEvents` or other types of events without extending any existing algorithm, or simply modifying the same algorithm to take new events into account.

#### Using positive and negative implicit events (without explicit rating) with MLlib ALS

In the original `ALSAlgorithm`, the `train()` function calculates the number of times that the user has viewed the same item and then map it to `MLlibRating` object. However, like/dislike event is boolean and one time preference, so it doesn't make sense to aggregate the events if the user has multiple like/dislike events on the same item. However, a user may like an item and change her mind to dislike the same item later, or vice versa. In this case, we could simply use the latest like or dislike event of the user for the same item.

In addition, MLlib ALS can handle negative preference with `ALS.trainImplicit()`. Hence, we can map a dislike to rating of -1 and like to 1.

INFO: Negative preference does not work if use `ALS.train()` instead, which is for explicit rating such as "rate" event.

In summary, this new `LikeAlgorithm` does the following:

- Extends original `ALSAlgorithm` class
- Override `train()` to process the `likeEvents` in `PreparedData`
- Use the latest event if the user likes/dislikes the same item multiple times
- Map dislike to a `MLlibRating` object with rating of -1 and like to rating of 1
- Use the `MLlibRating` to train the `ALSModel` in the same way as the original `ALSAlgorithm`
- The `predict()` function is the same as the original `ALSAlgorithm`

It is shown in the code below:

```scala
package org.apache.predictionio.examples.similarproduct

import org.apache.predictionio.data.storage.BiMap

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}

import grizzled.slf4j.Logger

// ADDED
// Extend original ALSAlgorithm and override train() function to handle
// like and dislike events
class LikeAlgorithm(ap: ALSAlgorithmParams) extends ALSAlgorithm(ap) {

  @transient lazy override val logger = Logger[this.type]

  override
  def train(sc: SparkContext, data: PreparedData): ALSModel = {
    require(!data.likeEvents.take(1).isEmpty,
      s"likeEvents in PreparedData cannot be empty." +
      " Please check if DataSource generates TrainingData" +
      " and Preprator generates PreparedData correctly.")
    require(!data.users.take(1).isEmpty,
      s"users in PreparedData cannot be empty." +
      " Please check if DataSource generates TrainingData" +
      " and Preprator generates PreparedData correctly.")
    require(!data.items.take(1).isEmpty,
      s"items in PreparedData cannot be empty." +
      " Please check if DataSource generates TrainingData" +
      " and Preprator generates PreparedData correctly.")
    // create User and item's String ID to integer index BiMap
    val userStringIntMap = BiMap.stringInt(data.users.keys)
    val itemStringIntMap = BiMap.stringInt(data.items.keys)

    // collect Item as Map and convert ID to Int index
    val items: Map[Int, Item] = data.items.map { case (id, item) =>
      (itemStringIntMap(id), item)
    }.collectAsMap.toMap

    val mllibRatings = data.likeEvents
      .map { r =>
        // Convert user and item String IDs to Int index for MLlib
        val uindex = userStringIntMap.getOrElse(r.user, -1)
        val iindex = itemStringIntMap.getOrElse(r.item, -1)

        if (uindex == -1)
          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
            + " to Int index.")

        if (iindex == -1)
          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
            + " to Int index.")

        // key is (uindex, iindex) tuple, value is (like, t) tuple
        ((uindex, iindex), (r.like, r.t))
      }.filter { case ((u, i), v) =>
        // keep events with valid user and item index
        (u != -1) && (i != -1)
      }.reduceByKey { case (v1, v2) => // MODIFIED
        // An user may like an item and change to dislike it later,
        // or vice versa. Use the latest value for this case.
        val (like1, t1) = v1
        val (like2, t2) = v2
        // keep the latest value
        if (t1 > t2) v1 else v2
      }.map { case ((u, i), (like, t)) => // MODIFIED
        // With ALS.trainImplicit(), we can use negative value to indicate
        // nagative siginal (ie. dislike)
        val r = if (like) 1 else -1
        // MLlibRating requires integer index for user and item
        MLlibRating(u, i, r)
      }
      .cache()

    // MLLib ALS cannot handle empty training data.
    require(!mllibRatings.take(1).isEmpty,
      s"mllibRatings cannot be empty." +
      " Please check if your events contain valid user and item ID.")
    // seed for MLlib ALS
    val seed = ap.seed.getOrElse(System.nanoTime)

    val m = ALS.trainImplicit(
      ratings = mllibRatings,
      rank = ap.rank,
      iterations = ap.numIterations,
      lambda = ap.lambda,
      blocks = -1,
      alpha = 1.0,
      seed = seed)

    new ALSModel(
      productFeatures = m.productFeatures.collectAsMap.toMap,
      itemStringIntMap = itemStringIntMap,
      items = items
    )
  }

}

```

### Step 4. Modify Serving to combine multiple algorithms' outputs

When the engine is deployed, the Query is sent to all algorithms of the engine. The PredictedResults returned by all algorithms are passed to Serving component for further processing, as you could see that the argument `predictedResults` of the `serve()` function is type of `Seq[PredictedResult]`.

In this example, the `serve()` function at first standardizes the PredictedResults of each algorithm so that we can combine the scores of multiple algorithms by adding the scores of the same item. Then we can take the top N items as defined in `query`.

Modify Serving.scala as shown below:

```scala

class Serving
  extends LServing[Query, PredictedResult] {

  override
  def serve(query: Query,
    predictedResults: Seq[PredictedResult]): PredictedResult = {

    // MODFIED
    val standard: Seq[Array[ItemScore]] = if (query.num == 1) {
      // if query 1 item, don't standardize
      predictedResults.map(_.itemScores)
    } else {
      // Standardize the score before combine
      val mvList: Seq[MeanAndVariance] = predictedResults.map { pr =>
        meanAndVariance(pr.itemScores.map(_.score))
      }

      predictedResults.zipWithIndex
        .map { case (pr, i) =>
          pr.itemScores.map { is =>
            // standardize score (z-score)
            // if standard deviation is 0 (when all items have the same score,
            // meaning all items are ranked equally), return 0.
            val score = if (mvList(i).stdDev == 0) {
              0
            } else {
              (is.score - mvList(i).mean) / mvList(i).stdDev
            }

            ItemScore(is.item, score)
          }
        }
    }

    // sum the standardized score if same item
    val combined = standard.flatten // Array of ItemScore
      .groupBy(_.item) // groupBy item id
      .mapValues(itemScores => itemScores.map(_.score).reduce(_ + _))
      .toArray // array of (item id, score)
      .sortBy(_._2)(Ordering.Double.reverse)
      .take(query.num)
      .map { case (k,v) => ItemScore(k, v) }

    PredictedResult(combined)
  }
}

```

NOTE: You may combine results of multiple algorithms in different ways based on your requirements.


### Step 5. Modify Engine.scala and engine.json

Modify Engine.scala to include the new algorithm `LikeAlgorithm` class and give it the name `"likealgo"` as shown below:


```scala
...

object SimilarProductEngine extends EngineFactory {
  def apply() = {
    new Engine(
      classOf[DataSource],
      classOf[Preparator],
      Map(
        "als" -> classOf[ALSAlgorithm],
        "cooccurrence" -> classOf[CooccurrenceAlgorithm],
        "likealgo" -> classOf[LikeAlgorithm]), // ADDED
      classOf[Serving])
  }
}

...

```

Next, in order to train and deploy two algorithms for this engine, we also need to modify engine.json to include the new algorithm. The `"algorithms"` parameter is an array of each algorithm's name and parameters. By default, it has the one algorithm `"als"` and its parameter. Add another JSON for the new algorithm named `"likealgo"` and its parameter to the `"algorithms"` array, as shown below:


```json

{
  ...

  "algorithms": [
    {
      "name": "als",
      "params": {
        "rank": 10,
        "numIterations" : 20,
        "lambda": 0.01,
        "seed": 3
      }
    },
    {
      "name": "likealgo",
      "params": {
        "rank": 8,
        "numIterations" : 15,
        "lambda": 0.01,
        "seed": 3
      }
    }
  ]
}

```

INFO: You may notice that the parameters of the new `"likealgo"` contains the same fields as `"als"`. It is just because the `LikeAlgorithm` class extends the original `ALSAlgorithm` class and shares the same algorithm parameter class definition. If the other algorithm you add has its own parameter class, you just need to specify them inside its `params` field accordingly.

That's it! Now you have a engine configured with two algorithms.


### Sample data with "like" and "dislike" events

For demonstration purpose, a sample import script is also provided for you to quickly test this engine. The script is modified from the original one used in [Quick Start](quickstart.html#import-more-sample-data) with the addition of importing like and dislike events.

You could find the import script in `data/import_eventserver.py`.

Make sure you are under the App directory. Execute the following to import the data (Replace the value of access_key parameter with your **Access Key**):

```
$ python data/import_eventserver.py --access_key 3mZWDzci2D5YsqAnqNnXH9SB6Rg3dsTBs8iHkK6X2i54IQsIZI1eEeQQyMfs7b3F
```

WARNING: If you see error **TypeError: __init__() got an unexpected keyword argument 'access_key'**,
please update the Python SDK to the latest version.


You are ready to run pio build, train and deploy as described in the [Quick Start](quickstart.html#5.-deploy-the-engine-as-a-service).
